using RedisNaruto.Consts;
using RedisNaruto.Enums;
using RedisNaruto.Internal;
using RedisNaruto.Internal.Models;
using RedisNaruto.Models;
using RedisNaruto.Utils;

namespace RedisNaruto.RedisCommands;

/// <summary>
/// redis stream
/// </summary>
public partial class RedisCommand : IRedisCommand
{
    /// <summary>
    /// 消息确认 针对消息组
    /// </summary>
    /// <param name="key"></param>
    /// <param name="group">消息组</param>
    /// <param name="messageId">消息id</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<int> XAckAsync(string key, string group, string[] messageId,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(group);
        ArgumentNullException.ThrowIfNull(messageId);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<int>(new Command(RedisCommandName.XAck, new object[]
            {
                key, group
            }.Concat(messageId).ToArray()));
        return result;
    }

    /// <summary>
    /// 添加流消息 
    /// </summary>
    /// <param name="key"></param>
    /// <param name="data">数据 kv结构存储到redis中为json格式数据</param>
    /// <param name="messageId">消息id * 号代表使用redis生成的消息id,如果自定义传递 新的id 需要大于流里面最新的id </param>
    /// <param name="isCreateStream">是否创建流，当为false的时候，当流不存在的话，不执行创建流命令. 6.2.0新增</param>
    /// <param name="maxMin">是否需要在添加的时候对流进行限制</param>
    /// <param name="threshold">临界值 需要指定 StreamAddMaxMinEnum</param>
    /// <param name="limitCount"> 6.2.0新增</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<string> XAddAsync(string key, Dictionary<string, object> data,
        string messageId = StreamConst.AutoGeneratedId,
        StreamMaxMinEnum maxMin = StreamMaxMinEnum.Default,
        string threshold = default, long? limitCount = null, bool isCreateStream = true,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(data);
        ArgumentNullException.ThrowIfNull(messageId);
        var argv = new List<object>()
        {
            key
        };
        argv.IfAdd(!isCreateStream, StreamConst.NOMKSTREAM);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxPrecision, StreamConst.MaxLen);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxPrecision, "=");
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxNearly, StreamConst.MaxLen);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxNearly, StreamConst.ApproximateMaxLen);

        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdPrecision, StreamConst.MinId);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdPrecision, "=");
        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdNearly, StreamConst.MinId);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdNearly, StreamConst.ApproximateMaxLen);
        argv.IfAdd(maxMin != StreamMaxMinEnum.Default, threshold);
        if (limitCount != null &&
            maxMin is StreamMaxMinEnum.MaxPrecision or StreamMaxMinEnum.MinIdPrecision)
        {
            throw new InvalidOperationException("当指定limitCount的时候，只能选择 Nearly");
        }

        argv.IfAdd(limitCount != null, StreamConst.Limit);
        argv.IfAdd(limitCount != null, limitCount);
        argv.Add(messageId);

        foreach (var item in data)
        {
            argv.Add(item.Key);
            argv.Add(item.Value);
        }

        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<string>(new Command(RedisCommandName.XAdd, argv.ToArray()));
        return result;
    }


    /// <summary>
    /// 删除流消息
    /// </summary>
    /// <param name="key"></param>
    /// <param name="messageId">消息id</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<int> XDelAsync(string key, string[] messageId,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(messageId);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<int>(new Command(RedisCommandName.XDel, new object[]
            {
                key
            }.Concat(messageId).ToArray()));
        return result;
    }

    /// <summary>
    /// 获取流长度
    /// </summary>
    /// <param name="key"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<int> XLenAsync(string key,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<int>(new Command(RedisCommandName.XLen, new object[]
            {
                key
            }));
        return result;
    }

    /// <summary>
    /// 裁剪流消息 
    /// </summary>
    /// <param name="key"></param>
    /// <param name="maxMin">是否需要在添加的时候对流进行限制</param>
    /// <param name="threshold">临界值 需要指定 StreamAddMaxMinEnum</param>
    /// <param name="limitCount"> 6.2.0新增</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<string> XTrimAsync(string key,
        StreamMaxMinEnum maxMin = StreamMaxMinEnum.Default,
        string threshold = default, long? limitCount = null,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        var argv = new List<object>()
        {
            key
        };
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxPrecision, StreamConst.MaxLen);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxPrecision, "=");
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxNearly, StreamConst.MaxLen);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MaxNearly, StreamConst.ApproximateMaxLen);

        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdPrecision, StreamConst.MinId);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdPrecision, "=");
        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdNearly, StreamConst.MinId);
        argv.IfAdd(maxMin == StreamMaxMinEnum.MinIdNearly, StreamConst.ApproximateMaxLen);
        argv.IfAdd(maxMin != StreamMaxMinEnum.Default, threshold);
        if (limitCount != null &&
            maxMin is StreamMaxMinEnum.MaxPrecision or StreamMaxMinEnum.MinIdPrecision)
        {
            throw new InvalidOperationException("当指定limitCount的时候，只能选择 Nearly");
        }

        argv.IfAdd(limitCount != null, "LIMIT");
        argv.IfAdd(limitCount != null, limitCount);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<string>(new Command(RedisCommandName.XTrim, argv.ToArray()));
        return result;
    }

    /// <summary>
    /// 创建消费者组 
    /// </summary>
    /// <param name="key"></param>
    /// <param name="groupName">组名称,名称在流内唯一，重复会报错</param>
    /// <param name="id">该命令的id参数从新组的角度指定流中最后传送的条目。特殊 ID$是流中最后一个条目的 ID，但您可以用任何有效 ID 替换它。
    /// 例如，如果您希望组的消费者从头开始获取整个流，请使用零作为消费者组的起始 ID：XGROUP CREATE mystream mygroup 0</param>
    /// <param name="createStream">当流不存在的时候 是否创建一个长度为0 的流</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> XGroupCreateAsync(string key, string groupName, string id = StreamConst.NewMessages,
        bool createStream = false,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(id);
        var argv = new List<object>()
        {
            StreamConst.Create,
            key,
            groupName,
            id
        };
        argv.IfAdd(createStream, StreamConst.MkStream);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<string>(new Command(RedisCommandName.XGroup, argv.ToArray()));
        return result == "OK";
    }

    /// <summary>
    /// 创建组内的消费者
    /// </summary>
    /// <param name="key"></param>
    /// <param name="groupName">指定的组</param>
    /// <param name="consumer">消费者名称</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> XGroupCreateConsumerAsync(string key, string groupName, string consumer,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumer);
        var argv = new List<object>()
        {
            StreamConst.CreateConsumer,
            key,
            groupName,
            consumer
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<int>(new Command(RedisCommandName.XGroup, argv.ToArray()));
        return result == 1;
    }

    /// <summary>
    /// 删除组内的消费者
    /// </summary>
    /// <param name="key"></param>
    /// <param name="groupName">指定的组</param>
    /// <param name="consumer">消费者名称</param>
    /// <param name="cancellationToken"></param>
    /// <returns>消费者在删除之前拥有的待处理消息数</returns>
    public async Task<int> XGroupDeleteConsumerAsync(string key, string groupName, string consumer,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumer);
        var argv = new List<object>()
        {
            StreamConst.DeleteConsumer,
            key,
            groupName,
            consumer
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<int>(new Command(RedisCommandName.XGroup, argv.ToArray()));
        return result;
    }

    /// <summary>
    /// 销毁流对应的消费者组的所有数据 包含所有与消费者组的消息
    /// </summary>
    /// <param name="key"></param>
    /// <param name="groupName">指定的组</param>
    /// <param name="cancellationToken"></param>
    /// <returns>消费者在删除之前拥有的待处理消息数</returns>
    public async Task<int> XGroupDestroyAsync(string key, string groupName,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        var argv = new List<object>()
        {
            StreamConst.Destroy,
            key,
            groupName
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<int>(new Command(RedisCommandName.XGroup, argv.ToArray()));
        return result;
    }

    /// <summary>
    /// 设置消费者组的最新交付的id
    /// </summary>
    /// <param name="key"></param>
    /// <param name="groupName">指定的组</param>
    /// <param name="id">重新读取 设置0，$为最新的</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> XGroupSetIdAsync(string key, string groupName, string id,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(id);
        var argv = new List<object>()
        {
            StreamConst.SetId,
            key,
            groupName,
            id
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result = await
            client.ExecuteAsync<string>(new Command(RedisCommandName.XGroup, argv.ToArray()));
        return result == "OK";
    }

    /// <summary>
    /// 获取流内的消费者组的信息
    /// </summary>
    /// <param name="key"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<XGroupInfoModel>> XGroupInfoAsync(string key,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        var argv = new List<object>()
        {
            StreamConst.Groups,
            key
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result =
            client.ExecuteMoreResultAsync<object>(new Command(RedisCommandName.XInfo, argv.ToArray()));
        var resList = new List<XGroupInfoModel>();
        await foreach (var item in result.WithCancellation(cancellationToken))
        {
            if (item is not List<object> {Count: > 11} list)
            {
                continue;
            }

            resList.Add(new XGroupInfoModel((StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "name")?.ToString()),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "consumers")?.ToString()).ToInt(),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "pending")?.ToString()).ToInt(),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "last-delivered-id")?.ToString()),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "entries-read")?.ToString()),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "lag")?.ToString()).ToInt()
            ));
        }

        return resList;
    }

    /// <summary>
    /// 获取流的消费者组中的消费者的信息
    /// </summary>
    /// <param name="key"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<XConsumerInfoModel>> XConsumerInfoAsync(string key, string group,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(group);
        var argv = new List<object>()
        {
            StreamConst.Consumers,
            key,
            group
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var result =
            client.ExecuteMoreResultAsync<object>(new Command(RedisCommandName.XInfo, argv.ToArray()));
        var resList = new List<XConsumerInfoModel>();
        await foreach (var item in result.WithCancellation(cancellationToken))
        {
            if (item is not List<object> {Count: > 5} list)
            {
                continue;
            }

            resList.Add(new XConsumerInfoModel(StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "name")?.ToString(),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "pending")?.ToString()).ToInt(),
                (StreamUtil<XConsumerInfoModel>.FindStreamValue(list, "idle")?.ToString()).ToLong()
            ));
        }

        return resList;
    }

    /// <summary>
    /// 获取流信息
    /// </summary>
    /// <param name="key"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<XStreamInfoModel> XStreamInfoAsync(string key,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        var argv = new List<object>()
        {
            StreamConst.Stream,
            key
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XInfo, argv.ToArray()));
        return new XStreamInfoModel(
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "length")?.ToString()).ToLong(),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "radix-tree-keys")?.ToString()).ToLong(),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "radix-tree-nodes")?.ToString()).ToLong(),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "last-generated-id")?.ToString()),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "max-deleted-entry-id")?.ToString()),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "entries-added")?.ToString()).ToLong(),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "recorded-first-entry-id")?.ToString()),
            (StreamUtil<XStreamInfoModel>.FindStreamValue(list, "groups")?.ToString()).ToInt()
        );
    }

    /// <summary>
    /// 读取流消息
    /// </summary>
    /// <param name="streamOffset">流信息</param>
    /// <param name="count">每个流返回的元素数据</param>
    /// <param name="blockTime">阻塞时间</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<StreamModel>> XReadAsync(ReadStreamOffset[] streamOffset, int count,
        TimeSpan? blockTime = null,
        CancellationToken cancellationToken = default)
    {
        if (streamOffset is not {Length: > 0})
        {
            throw new InvalidOperationException("key和messageIds需要一致");
        }

        var argv = new List<object>()
        {
            StreamConst.Count,
            count
        };
        if (blockTime != null)
        {
            argv.Add(blockTime.Value.TotalMilliseconds);
        }

        argv.Add(StreamConst.Streams);
        argv.AddRange(streamOffset.Select(a => a.Key));
        argv.AddRange(streamOffset.Select(a => a.Offset));
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XRead, argv.ToArray()));

        return list.BuildStreamList();
    }

    /// <summary>
    /// 读取流消息
    /// </summary>
    /// <param name="key">流的缓存key</param>
    /// <param name="start">从哪个消息id开始， -代表最小的，</param>
    /// <param name="end">从哪个消息id结束， +代表最大的，</param>
    /// <param name="count">返回的元素数据</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<StreamEntityModel>> XRangeAsync(string key, string start, string end, int count,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        var argv = new List<object>()
        {
            key,
            start,
            end,
            StreamConst.Count,
            count
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XRange, argv.ToArray()));
        return list.BuildStreamEntityModel();
    }

    /// <summary>
    /// 读取流消息
    /// 反向
    /// </summary>
    /// <param name="key">流的缓存key</param>
    /// <param name="start">从哪个消息id开始， +代表最大的，</param>
    /// <param name="end">从哪个消息id结束， -代表最小的，</param>
    /// <param name="count">返回的元素数据</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<StreamEntityModel>> XRevRangeAsync(string key, string start, string end, int count,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        var argv = new List<object>()
        {
            key,
            start,
            end,
            StreamConst.Count,
            count
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XRevRange, argv.ToArray()));
        return list.BuildStreamEntityModel();
    }

    /// <summary>
    /// 从消费组读取消息
    /// </summary>
    /// <param name="streamOffset">流信息</param>
    /// <param name="groupName">消费者组名称</param>
    /// <param name="consumerName">消费者信息</param>
    /// <param name="blockTime">阻塞时间</param>
    /// <param name="count">每次读取的消息数量</param>
    /// <param name="noAck"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<StreamModel>> XReadGroupAsync(ReadGroupStreamOffset[] streamOffset, string groupName,
        string consumerName,
        int count, TimeSpan? blockTime = null, bool noAck = false,
        CancellationToken cancellationToken = default)
    {
        if (streamOffset is not {Length: > 0})
        {
            throw new ArgumentNullException(nameof(streamOffset));
        }

        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumerName);

        var argv = new List<object>()
        {
            StreamConst.Group,
            groupName,
            consumerName,
            StreamConst.Count,
            count,
        };

        if (blockTime != null)
        {
            argv.Add(StreamConst.Block);
            argv.Add(blockTime.Value.TotalMilliseconds);
        }

        if (noAck)
        {
            argv.Add(StreamConst.NoAck);
        }

        argv.Add(StreamConst.Streams);

        argv.AddRange(streamOffset.Select(a => a.Key));
        argv.AddRange(streamOffset.Select(a => a.Offset));
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XReadGroup, argv.ToArray()));

        return list.BuildStreamList();
    }

    /// <summary>
    /// 获取消费者组中的未ack的消息对应的消费者信息
    /// </summary>
    /// <param name="key">流信息</param>
    /// <param name="groupName">消费者组名称</param>
    /// <param name="consumerName">消费者信息</param>
    /// <param name="count">每次读取的消息数量</param>
    /// <param name="end">最大id</param>
    /// <param name="start">最小的id</param>
    /// <param name="idle">根据空闲时间来筛选</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<List<XPendingInfoModel>> XPendingAsync(string key, string groupName,
        int count, string consumerName = null, string start = null, string end = null, TimeSpan? idle = null,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        var argv = new List<object>()
        {
            key,
            groupName,
        };
        if (idle != null)
        {
            argv.Add(StreamConst.Idle);
            argv.Add(idle.Value.TotalMilliseconds);
        }

        argv.Add(start ?? StreamConst.MinValue);

        argv.Add(end ?? StreamConst.MaxValue);

        argv.Add(count);
        argv.IfAdd(consumerName != null, consumerName);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XPending, argv.ToArray()));
        var resultList = new List<XPendingInfoModel>();
        foreach (var itemList in list)
        {
            if (itemList is not List<object> valueList)
            {
                continue;
            }

            resultList.Add(new XPendingInfoModel(valueList[0].ToString(), valueList[1].ToString(),
                valueList[2].ToString().ToLong(), valueList[3].ToString().ToInt()));
        }

        return resultList;
    }

    /// <summary>
    /// 转移消息的所有权
    /// </summary>
    /// <param name="key">流</param>
    /// <param name="groupName">消费者组</param>
    /// <param name="consumerName">需要转移到的目标消费者</param>
    /// <param name="isForce">即使某些指定的 ID 尚未在分配给不同客户端的 PEL 中，也会在 PEL 中创建挂起的消息条目。但是消息必须存在于流中，否则不存在消息的 ID 将被忽略。</param>
    /// <param name="cancellationToken"></param>
    /// <param name="messageIds">需要转移的消息id</param>
    /// <param name="minIdleTime">最小的空闲时间</param>
    /// <param name="idle">设置消息的空闲时间（上次发送时间）。如果未指定 IDLE，则假定 IDLE 为 0，即重置时间计数，因为消息现在有新所有者尝试处理它</param>
    /// <param name="time">这与 IDLE 相同，但不是相对的毫秒数，而是将空闲时间设置为特定的 Unix 时间（以毫秒为单位）。这对于重写 AOF 文件生成XCLAIM命令很有用。</param>
    /// <param name="retryCount">将重试计数器设置为指定值。每次再次传递消息时，此计数器都会递增。通常XCLAIM不会更改此计数器，它仅在调用 XPENDING 命令时提供给客户端：这样客户端可以检测异常情况，例如在大量传递尝试后由于某种原因从未处理过的消息</param>
    /// <returns></returns>
    public async Task<List<StreamEntityModel>> XClaimAsync(string key, string groupName, string consumerName,
        string[] messageIds,
        TimeSpan minIdleTime,
        TimeSpan? idle = default, TimeSpan? time = default,
        int? retryCount = default, bool isForce = false,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumerName);
        ArgumentNullException.ThrowIfNull(messageIds);

        var argv = new List<object>()
        {
            key,
            groupName,
            consumerName,
            minIdleTime.TotalMilliseconds
        };
        argv.AddRange(messageIds);
        if (idle != null)
        {
            argv.Add(StreamConst.Idle);
            argv.Add(idle.Value.TotalMilliseconds);
        }

        if (time != null)
        {
            argv.Add(StreamConst.Time);
            argv.Add(time.Value.TotalMilliseconds);
        }

        if (retryCount != null)
        {
            argv.Add(StreamConst.RetryCount);
            argv.Add(retryCount.Value);
        }

        argv.IfAdd(isForce, StreamConst.Force);

        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XClaim, argv.ToArray()));
        return list.BuildStreamEntityModel();
    }

    /// <summary>
    /// 转移消息的所有权
    /// 返回消息的id
    /// </summary>
    /// <param name="key">流</param>
    /// <param name="groupName">消费者组</param>
    /// <param name="consumerName">需要转移到的目标消费者</param>
    /// <param name="isForce">即使某些指定的 ID 尚未在分配给不同客户端的 PEL 中，也会在 PEL 中创建挂起的消息条目。但是消息必须存在于流中，否则不存在消息的 ID 将被忽略。</param>
    /// <param name="cancellationToken"></param>
    /// <param name="messageIds">需要转移的消息id</param>
    /// <param name="minIdleTime">最小的空闲时间</param>
    /// <param name="idle">设置消息的空闲时间（上次发送时间）。如果未指定 IDLE，则假定 IDLE 为 0，即重置时间计数，因为消息现在有新所有者尝试处理它</param>
    /// <param name="time">这与 IDLE 相同，但不是相对的毫秒数，而是将空闲时间设置为特定的 Unix 时间（以毫秒为单位）。这对于重写 AOF 文件生成XCLAIM命令很有用。</param>
    /// <param name="retryCount">将重试计数器设置为指定值。每次再次传递消息时，此计数器都会递增。通常XCLAIM不会更改此计数器，它仅在调用 XPENDING 命令时提供给客户端：这样客户端可以检测异常情况，例如在大量传递尝试后由于某种原因从未处理过的消息</param>
    /// <returns></returns>
    public async Task<List<string>> XClaimWithIdAsync(string key, string groupName, string consumerName,
        string[] messageIds,
        TimeSpan minIdleTime,
        TimeSpan? idle = default, TimeSpan? time = default,
        int? retryCount = default, bool isForce = false,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumerName);
        ArgumentNullException.ThrowIfNull(messageIds);

        var argv = new List<object>()
        {
            key,
            groupName,
            consumerName,
            minIdleTime.TotalMilliseconds
        };
        argv.AddRange(messageIds);
        if (idle != null)
        {
            argv.Add(StreamConst.Idle);
            argv.Add(idle.Value.TotalMilliseconds);
        }

        if (time != null)
        {
            argv.Add(StreamConst.Time);
            argv.Add(time.Value.TotalMilliseconds);
        }

        if (retryCount != null)
        {
            argv.Add(StreamConst.RetryCount);
            argv.Add(retryCount.Value);
        }

        argv.IfAdd(isForce, StreamConst.Force);
        argv.Add(StreamConst.JustId);
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list =
            client.ExecuteMoreResultAsync<string>(new Command(RedisCommandName.XClaim, argv.ToArray()));
        return await list.ToListAsync();
    }

    /// <summary>
    /// 自动转移消息的所有权
    /// 返回消息的id
    /// </summary>
    /// <param name="key">流</param>
    /// <param name="groupName">消费者组</param>
    /// <param name="consumerName">需要转移到的目标消费者</param>
    /// <param name="count">条数上限</param>
    /// <param name="cancellationToken"></param>
    /// <param name="minIdleTime">最小的空闲时间</param>
    /// <param name="start">排查开始的id</param>
    /// <returns></returns>
    public async Task<XAutoClaimWithIdModel> XAutoClaimWithIdAsync(string key, string groupName, string consumerName,
        TimeSpan minIdleTime, string start, int count = 100,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumerName);

        var argv = new List<object>()
        {
            key,
            groupName,
            consumerName,
            minIdleTime.TotalMilliseconds,
            start,
            StreamConst.Count,
            count,
            StreamConst.JustId
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XAutoClaim, argv.ToArray()));
        if (list is not {Count: > 1})
        {
            return default;
        }

        var nextId = list[0].ToString();
        if (list[1] is not List<object> entitys)
        {
            return default;
        }

        var entityStreams = new List<string>(entitys.Count);
        entityStreams.AddRange(entitys.Select(itemStreamId => itemStreamId.ToString()));

        List<string> deleteIds = null;
        if (list.Count == 3 && list[2] is List<object> {Count: > 0} deletes)
        {
            deleteIds = deletes.Select(item => item.ToString()).ToList();
        }

        return new(nextId, entityStreams, deleteIds);
    }

    /// <summary>
    /// 自动转移消息的所有权
    /// </summary>
    /// <param name="key">流</param>
    /// <param name="groupName">消费者组</param>
    /// <param name="consumerName">需要转移到的目标消费者</param>
    /// <param name="count">条数上限</param>
    /// <param name="cancellationToken"></param>
    /// <param name="minIdleTime">最小的空闲时间</param>
    /// <param name="start">排查开始的id</param>
    /// <returns></returns>
    public async Task<XAutoClaimModel> XAutoClaimAsync(string key, string groupName, string consumerName,
        TimeSpan minIdleTime, string start, int count = 100,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(key);
        ArgumentNullException.ThrowIfNull(groupName);
        ArgumentNullException.ThrowIfNull(consumerName);

        var argv = new List<object>()
        {
            key,
            groupName,
            consumerName,
            minIdleTime.TotalMilliseconds,
            start,
            StreamConst.Count,
            count
        };
        cancellationToken.ThrowIfCancellationRequested();
        await using var client = await GetRedisClient(cancellationToken);
        var list = await
            client.ExecuteAsync<List<object>>(new Command(RedisCommandName.XAutoClaim, argv.ToArray()));
        if (list is not {Count: > 1})
        {
            return default;
        }

        var nextId = list[0].ToString();
        if (list[1] is not List<object> entitys)
        {
            return default;
        }

        var entityStreams = entitys.BuildStreamEntityModel();
        List<string> deleteIds = null;
        if (list.Count == 3 && list[2] is List<object> {Count: > 0} deletes)
        {
            deleteIds = deletes.Select(item => item.ToString()).ToList();
        }

        return new(nextId, entityStreams, deleteIds);
    }
}